package cn.piflow.shell

import java.io.File

import cn.piflow.FlowEngine
import cn.piflow.rpc.{ClassLoaderService, HttpRpcServer, RemoteFlowEngine}
import cn.piflow.util.Logging
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.spark.sql.SparkSession

import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.ILoop
import scala.xml.XML

class Shell extends Logging {
	lazy val properties = {
		//load properties from properties.xml
		val xml = XML.loadString(IOUtils.toString(this.getClass.getResource("/properties.xml").openStream()));
		val elements = xml \ "property";
		val map = collection.mutable.Map[String, String]();
		elements.map { x ⇒
			map(x.attribute("name").get.text) = x.attribute("value").getOrElse(x).text;
		}
		map;
	}

	def run(classPath: String = null) {
		val (spark, engine) = FlowEngine.startByConf();

		val repl: ILoop = new ILoop {
			override def createInterpreter() = {
				super.createInterpreter();

				intp.beQuietDuring {
					intp.bind("engine", classOf[FlowEngine].getName, engine, List("implicit"));
					intp.bind("spark", classOf[SparkSession].getName, spark, List("implicit"));
					intp.bind("jobs", new cn.piflow.shell.cmd.JobCmd(engine));
					intp.bind("store", new cn.piflow.shell.cmd.StoreCmd(engine));
				}
				val text = IOUtils.toString(this.getClass.getResource("/predefined.scala").openStream());
				val script = text.replaceAll("class\\s+PRELOAD_CODES.*\\{([\\s\\S]*)\\}", "$1");
				intp.quietRun(script);
			}

			override def printWelcome(): Unit = {
				println(properties("WelcomeMessage"));
			}

			val promptMsg = properties("ShellPrompt");

			override def prompt = promptMsg;
		}

		if (engine.isInstanceOf[RemoteFlowEngine]) {
			val httpPort = 1225;
			val httpServletPath = "/rpc";
			val httpHost = "localhost";

			//start an RPC server which provide REPL generated classes to RemoteFlowEngine
			new HttpRpcServer().serve(classOf[ClassLoaderService].getName, new ClassLoaderServiceImpl(repl)).
				start(httpPort, httpServletPath);

			//notify RemoteFlowEngine
			engine.asInstanceOf[RemoteFlowEngine].
				registerRemoteClassLoaderServer(s"http://${httpHost}:${httpPort}${httpServletPath}");
		}

		val settings = new Settings;
		settings.feature.value = false;
		settings.Yreplsync.value = true;
		settings.Yreplclassbased.value = true;
		val dir = new File("/tmp/repl-classes/");
		FileUtils.deleteDirectory(dir);
		dir.mkdirs();
		settings.Yreploutdir.value = dir.getAbsolutePath;
		//settings.Ylogcp.value = true;
		//use when launching normally outside SBT
		settings.usejavacp.value = true;
		if (classPath != null) {
			logger.debug(s"class path: $classPath");
			settings.classpath.value = classPath;
		}

		settings.debug.value = false;
		repl.process(settings);
		spark.close();
	}
}

class ClassLoaderServiceImpl(repl: ILoop) extends ClassLoaderService with Logging {
	override def loadClassAsStream(name: String): Array[Byte] = {
		val loader = repl.classLoader;
		//val clazz =loader.loadClass(name);
		val resname = name.replace('.', '/') + ".class";
		val res = loader.getResource(resname);
		if (res == null)
			logger.warn(s"failed to loading class: $name, resource path: $resname");

		IOUtils.toByteArray(res.openStream());
	}
}